// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "cloud/cloud_index_change_compaction.h"

#include <gtest/gtest.h>

#include "cloud/cloud_base_compaction.h"
#include "cloud/cloud_cumulative_compaction.h"
#include "cpp/sync_point.h"
#include "json2pb/json_to_pb.h"
#include "olap/rowset/beta_rowset.h"

namespace doris {

class CloudIndexChangeCompactionTest : public testing::Test {
public:
    void SetUp() {
        _engine = std::make_unique<CloudStorageEngine>(EngineOptions {});
        auto sp = SyncPoint::get_instance();
        sp->enable_processing();
    }

    void TearDown() {}

    void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) {
        std::string json_rowset_meta = R"({
            "rowset_id": 540085,
            "tablet_id": 15674,
            "partition_id": 10000,
            "txn_id": 4045,
            "tablet_schema_hash": 567997588,
            "rowset_type": "BETA_ROWSET",
            "rowset_state": "VISIBLE",
            "start_version": 2,
            "end_version": 2,
            "num_rows": 3929,
            "total_disk_size": 84699,
            "data_disk_size": 84464,
            "index_disk_size": 235,
            "empty": false,
            "load_id": {
                "hi": -5350970832824939812,
                "lo": -6717994719194512122
            },
            "creation_time": 1553765670
        })";

        RowsetMetaPB rowset_meta_pb;
        json2pb::JsonToProtoMessage(json_rowset_meta, &rowset_meta_pb);
        rowset_meta_pb.set_start_version(start);
        rowset_meta_pb.set_end_version(end);
        rowset_meta_pb.set_creation_time(10000);

        pb1->init_from_pb(rowset_meta_pb);
    }

    bool contains_str(std::string origin, std::string sub_str) {
        return origin.find(sub_str) != std::string::npos;
    }

public:
    std::unique_ptr<CloudStorageEngine> _engine;
};

TEST_F(CloudIndexChangeCompactionTest, ms_ret_status_test) {
    std::vector<TOlapTableIndex> index_list;
    TOlapTableIndex index;
    index.index_id = 1;
    index.columns.emplace_back("col1");
    index_list.push_back(index);

    TabletSchemaPB schema_pb;
    schema_pb.set_keys_type(KeysType::DUP_KEYS);

    ColumnPB* column_pb = schema_pb.add_column();
    column_pb->set_unique_id(10000);
    column_pb->set_name("col1");
    column_pb->set_type("int");
    column_pb->set_is_key(true);
    column_pb->set_is_nullable(true);
    auto tablet_schema = std::make_shared<TabletSchema>();
    tablet_schema->init_from_pb(schema_pb);

    auto rowset_meta = std::make_shared<RowsetMeta>();
    init_rs_meta(rowset_meta, 1, 1);
    rowset_meta->set_num_segments(2);
    RowsetSharedPtr rowset_ptr = std::make_shared<BetaRowset>(tablet_schema, rowset_meta, "");

    TabletMetaPB input_meta_pb;
    input_meta_pb.set_tablet_id(1000);
    input_meta_pb.set_schema_hash(123456);
    input_meta_pb.set_tablet_state(PB_RUNNING);
    *input_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto();

    auto tablet_meta = std::make_shared<TabletMeta>();
    tablet_meta->init_from_pb(input_meta_pb);

    CloudTabletSPtr tablet = std::make_shared<CloudTablet>(*_engine, tablet_meta);

    Version v1;
    v1.first = 1;
    v1.second = 2;
    tablet->_rs_version_map[v1] = rowset_ptr;

    std::vector<TColumn> columns;

    {
        Status ret_set = tablet->set_tablet_state(TabletState::TABLET_TOMBSTONED);
        std::shared_ptr<CloudIndexChangeCompaction> index_change_compact =
                std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list,
                                                             columns);
        Status prepare_ret = index_change_compact->prepare_compact();
        ASSERT_TRUE(!prepare_ret.ok());
        ASSERT_TRUE(contains_str(prepare_ret.to_string(), "invalid tablet state"));
        ASSERT_TRUE(ret_set.ok());
    }

    Status ret_set2 = tablet->set_tablet_state(TabletState::TABLET_RUNNING);
    ASSERT_TRUE(ret_set2.ok());

    // 1 test prepare
    {
        auto sp = SyncPoint::get_instance();
        sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [](auto&& outcome) {
            auto* pairs = try_any_cast_ret<Status>(outcome);
            pairs->second = true;
            pairs->first = Status::OK();
        });
        std::shared_ptr<CloudIndexChangeCompaction> index_change_compact =
                std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list,
                                                             columns);
        Status prepare_ret = index_change_compact->prepare_compact();
        ASSERT_TRUE(prepare_ret.ok());
    }

    {
        auto sp = SyncPoint::get_instance();
        sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [](auto&& outcome) {
            auto* pairs = try_any_cast_ret<Status>(outcome);
            pairs->second = true;
            pairs->first = Status::InternalError<false>("mock sync meta failed");
        });

        std::shared_ptr<CloudIndexChangeCompaction> index_change_compact =
                std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list,
                                                             columns);
        Status prepare_ret = index_change_compact->prepare_compact();
        ASSERT_TRUE(!prepare_ret.ok());
        ASSERT_TRUE(contains_str(prepare_ret.to_string(), "mock sync meta failed"));
    }

    // 2 test request global lock
    {
        auto sp = SyncPoint::get_instance();
        sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) {
            auto* pairs = try_any_cast_ret<Status>(outcome);
            pairs->second = true;
            pairs->first = Status::OK();
        });

        std::shared_ptr<CloudIndexChangeCompaction> index_change_compact =
                std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list,
                                                             columns);
        index_change_compact->_input_rowsets.push_back(rowset_ptr);
        bool should_skip_err = false;
        Status ret = index_change_compact->request_global_lock(should_skip_err);
        ASSERT_TRUE(ret.ok());
        ASSERT_TRUE(!should_skip_err);
    }

    {
        // stale tablet error
        auto sp = SyncPoint::get_instance();
        sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) {
            auto* pairs = try_any_cast_ret<Status>(outcome);
            pairs->second = true;
            pairs->first = Status::InternalError<false>("mock stale tablet error");

            auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]);
            resp->mutable_status()->set_code(cloud::STALE_TABLET_CACHE);
            resp->mutable_status()->set_msg("mock stale tablet error");
        });

        std::shared_ptr<CloudIndexChangeCompaction> index_change_compact =
                std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list,
                                                             columns);
        index_change_compact->_input_rowsets.push_back(rowset_ptr);
        bool should_skip_err = false;
        Status ret = index_change_compact->request_global_lock(should_skip_err);
        ASSERT_TRUE(!ret.ok());
        ASSERT_TRUE(should_skip_err);
        ASSERT_TRUE(contains_str(ret.to_string(), "mock stale tablet error"));
    }

    {
        // tablet not found
        auto sp = SyncPoint::get_instance();
        sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) {
            auto* pairs = try_any_cast_ret<Status>(outcome);
            pairs->second = true;
            pairs->first = Status::InternalError<false>("mock tablet not found");

            auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]);
            resp->mutable_status()->set_code(cloud::TABLET_NOT_FOUND);
            resp->mutable_status()->set_msg("mock tablet not found");
        });

        std::shared_ptr<CloudIndexChangeCompaction> index_change_compact =
                std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list,
                                                             columns);
        index_change_compact->_input_rowsets.push_back(rowset_ptr);
        bool should_skip_err = false;
        Status ret = index_change_compact->request_global_lock(should_skip_err);
        ASSERT_TRUE(!ret.ok());
        ASSERT_TRUE(!should_skip_err);
        ASSERT_TRUE(contains_str(ret.to_string(), "mock tablet not found"));
    }

    {
        // job tablet busy
        auto sp = SyncPoint::get_instance();
        sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) {
            auto* pairs = try_any_cast_ret<Status>(outcome);
            pairs->second = true;
            pairs->first = Status::InternalError<false>("mock job tablet busy");

            auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]);
            resp->mutable_status()->set_code(cloud::JOB_TABLET_BUSY);
            resp->mutable_status()->set_msg("mock job tablet busy");
        });
        std::shared_ptr<CloudIndexChangeCompaction> index_change_compact =
                std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list,
                                                             columns);
        index_change_compact->_input_rowsets.push_back(rowset_ptr);
        bool should_skip_err = false;
        Status ret = index_change_compact->request_global_lock(should_skip_err);
        ASSERT_TRUE(!ret.ok());
        ASSERT_TRUE(!should_skip_err);
        ASSERT_TRUE(contains_str(ret.to_string(), "index change compaction no suitable versions"));
    }

    {
        // job check alter version
        auto sp = SyncPoint::get_instance();
        sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) {
            auto* pairs = try_any_cast_ret<Status>(outcome);
            pairs->second = true;
            pairs->first = Status::InternalError<false>("mock job check alter version");

            auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]);
            resp->mutable_status()->set_code(cloud::JOB_CHECK_ALTER_VERSION);
            resp->mutable_status()->set_msg("mock job check alter version");
        });

        std::shared_ptr<CloudIndexChangeCompaction> index_change_compact =
                std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list,
                                                             columns);
        index_change_compact->_input_rowsets.push_back(rowset_ptr);
        bool should_skip_err = false;
        Status ret = index_change_compact->request_global_lock(should_skip_err);
        ASSERT_TRUE(!ret.ok());
        ASSERT_TRUE(!should_skip_err);
        ASSERT_TRUE(contains_str(ret.to_string(), "failed in schema change."));
    }

    // modify rowsets
    {
        // tablet not found
        auto sp = SyncPoint::get_instance();
        sp->set_call_back("CloudMetaMgr::commit_tablet_job", [](auto&& outcome) {
            auto* pairs = try_any_cast_ret<Status>(outcome);
            pairs->second = true;
            pairs->first = Status::InternalError<false>("mock tablet not found");

            auto* resp = try_any_cast<cloud::FinishTabletJobResponse*>(outcome[1]);
            resp->mutable_status()->set_code(cloud::TABLET_NOT_FOUND);
            resp->mutable_status()->set_msg("mock tablet not found");
        });

        std::shared_ptr<CloudIndexChangeCompaction> index_change_compact =
                std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list,
                                                             columns);
        index_change_compact->_input_rowsets.push_back(rowset_ptr);
        index_change_compact->_output_rowset = rowset_ptr;
        index_change_compact->_compact_type = cloud::TabletCompactionJobPB::BASE;
        Status ret = index_change_compact->modify_rowsets();
        ASSERT_TRUE(!ret.ok());
        ASSERT_TRUE(contains_str(ret.to_string(), "mock tablet not found"));
    }

    {
        // job check alter version
        auto sp = SyncPoint::get_instance();
        sp->set_call_back("CloudMetaMgr::commit_tablet_job", [](auto&& outcome) {
            auto* pairs = try_any_cast_ret<Status>(outcome);
            pairs->second = true;
            pairs->first = Status::InternalError<false>("mock check alter version");

            auto* resp = try_any_cast<cloud::FinishTabletJobResponse*>(outcome[1]);
            resp->mutable_status()->set_code(cloud::JOB_CHECK_ALTER_VERSION);
            resp->mutable_status()->set_msg("mock check alter version");
        });

        std::shared_ptr<CloudIndexChangeCompaction> index_change_compact =
                std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list,
                                                             columns);
        index_change_compact->_input_rowsets.push_back(rowset_ptr);
        index_change_compact->_output_rowset = rowset_ptr;
        index_change_compact->_compact_type = cloud::TabletCompactionJobPB::BASE;
        Status ret = index_change_compact->modify_rowsets();
        ASSERT_TRUE(!ret.ok());
        ASSERT_TRUE(contains_str(ret.to_string(), "failed in schema change"));
    }
}

TEST_F(CloudIndexChangeCompactionTest, basic_compaction_test) {
    TabletSchemaPB schema_pb;
    schema_pb.set_keys_type(KeysType::DUP_KEYS);

    ColumnPB* column_pb = schema_pb.add_column();
    column_pb->set_unique_id(10000);
    column_pb->set_name("col1");
    column_pb->set_type("int");
    column_pb->set_is_key(true);
    column_pb->set_is_nullable(true);
    auto tablet_schema1 = std::make_shared<TabletSchema>();
    tablet_schema1->init_from_pb(schema_pb);

    auto tablet_schema2 = std::make_shared<TabletSchema>();
    tablet_schema2->init_from_pb(schema_pb);

    auto rowset_meta = std::make_shared<RowsetMeta>();
    init_rs_meta(rowset_meta, 0, 1);
    rowset_meta->set_num_segments(2);
    rowset_meta->_schema = tablet_schema1;
    RowsetSharedPtr rowset_ptr = std::make_shared<BetaRowset>(tablet_schema1, rowset_meta, "");

    auto output_rowset_meta = std::make_shared<RowsetMeta>();
    init_rs_meta(output_rowset_meta, 0, 1);
    output_rowset_meta->set_num_segments(2);
    output_rowset_meta->_schema = tablet_schema1;
    Version v0;
    v0.first = 10;
    v0.second = 10;
    output_rowset_meta->set_version(v0);
    RowsetSharedPtr output_rowset_ptr =
            std::make_shared<BetaRowset>(tablet_schema1, output_rowset_meta, "");

    CloudTabletSPtr tablet =
            std::make_shared<CloudTablet>(*_engine, std::make_shared<TabletMeta>());

    Version v1;
    v1.first = 0;
    v1.second = 2;
    tablet->_rs_version_map[v1] = rowset_ptr;

    std::vector<TOlapTableIndex> index_list;
    std::vector<TColumn> columns;
    CloudIndexChangeCompaction cloud_index_change_compaction(*_engine, tablet, 123, index_list,
                                                             columns);

    cloud_index_change_compaction._input_rowsets.clear();
    cloud_index_change_compaction._input_rowsets.push_back(rowset_ptr);
    Status re_build_schema = cloud_index_change_compaction.rebuild_tablet_schema();
    ASSERT_TRUE(cloud_index_change_compaction._final_tablet_schema != nullptr);
    ASSERT_TRUE(cloud_index_change_compaction._cur_tablet_schema->schema_version() == 123);
    ASSERT_TRUE(re_build_schema.ok());

    // test is_index_change_compaction
    CloudBaseCompaction cloud_base_compaction(*_engine, tablet);
    CloudCumulativeCompaction cloud_cumu_compaction(*_engine, tablet);
    ASSERT_TRUE(cloud_index_change_compaction.is_index_change_compaction());
    ASSERT_FALSE(cloud_base_compaction.is_index_change_compaction());
    ASSERT_FALSE(cloud_cumu_compaction.is_index_change_compaction());

    // test delete predicate
    cloud_index_change_compaction._input_rowsets.clear();
    cloud_index_change_compaction._input_rowsets.push_back(rowset_ptr);
    cloud_index_change_compaction._output_rowset = output_rowset_ptr;

    DeletePredicatePB del_pred_pb;
    InPredicatePB* in_pred = del_pred_pb.add_in_predicates();
    in_pred->set_column_name("col1");
    in_pred->set_is_not_in(true);
    in_pred->add_values("123");
    in_pred->add_values("456");
    rowset_ptr->rowset_meta()->set_delete_predicate(del_pred_pb);

    cloud_index_change_compaction._allow_delete_in_cumu_compaction = false;
    reinterpret_cast<Compaction*>(&cloud_index_change_compaction)
            ->set_delete_predicate_for_output_rowset();

    auto& in_predicates = cloud_index_change_compaction._output_rowset->rowset_meta()
                                  ->delete_predicate()
                                  .in_predicates();
    ASSERT_TRUE(in_predicates[0].column_name() == "col1");
    ASSERT_TRUE(in_predicates[0].is_not_in() == true);
    ASSERT_TRUE(in_predicates[0].values().size() == 2);
}

TEST_F(CloudIndexChangeCompactionTest, test_cloud_tablet) {
    TabletSchemaPB schema_pb;
    schema_pb.set_keys_type(KeysType::DUP_KEYS);

    ColumnPB* column_pb = schema_pb.add_column();
    column_pb->set_unique_id(10000);
    column_pb->set_name("col1");
    column_pb->set_type("int");
    column_pb->set_is_key(true);
    column_pb->set_is_nullable(true);
    auto tablet_schema = std::make_shared<TabletSchema>();
    tablet_schema->init_from_pb(schema_pb);
    tablet_schema->set_schema_version(10);

    auto rowset_meta = std::make_shared<RowsetMeta>();
    init_rs_meta(rowset_meta, 1, 1);
    rowset_meta->set_num_segments(2);
    rowset_meta->set_num_rows(0);
    RowsetSharedPtr rowset_ptr = std::make_shared<BetaRowset>(tablet_schema, rowset_meta, "");

    TabletMetaPB input_meta_pb;
    input_meta_pb.set_tablet_id(1000);
    input_meta_pb.set_schema_hash(123456);
    input_meta_pb.set_tablet_state(PB_RUNNING);
    *input_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto();

    auto tablet_meta = std::make_shared<TabletMeta>();
    tablet_meta->init_from_pb(input_meta_pb);

    CloudTabletSPtr tablet = std::make_shared<CloudTablet>(*_engine, tablet_meta);

    Version v1;
    v1.first = 1;
    v1.second = 2;
    tablet->_rs_version_map[v1] = rowset_ptr;

    {
        bool is_base_rowset = false;
        auto ret = tablet->pick_a_rowset_for_index_change(1, is_base_rowset);
        ASSERT_TRUE(ret.value() == nullptr);
        ASSERT_TRUE(!is_base_rowset);
    }

    {
        std::vector<TColumn> t_columns;
        TColumn col1;
        col1.__set_column_name("col2");
        col1.__set_col_unique_id(123);

        TColumnType column_type;
        column_type.__set_type(TPrimitiveType::STRING);
        col1.__set_column_type(column_type);

        t_columns.emplace_back(std::move(col1));

        Status ret1 = tablet->check_rowset_schema_for_build_index(t_columns, 1000);
        ASSERT_TRUE(!ret1.ok());
        ASSERT_TRUE(contains_str(ret1.to_string(), "col is dropped"));
    }

    {
        std::vector<TColumn> t_columns;
        TColumn col1;
        col1.__set_column_name("col1");
        col1.__set_col_unique_id(123);

        TColumnType column_type;
        column_type.__set_type(TPrimitiveType::STRING);
        col1.__set_column_type(column_type);

        t_columns.emplace_back(std::move(col1));

        Status ret1 = tablet->check_rowset_schema_for_build_index(t_columns, 1000);
        ASSERT_TRUE(!ret1.ok());
        ASSERT_TRUE(contains_str(ret1.to_string(), "col id not match"));
    }

    {
        std::vector<TColumn> t_columns;
        TColumn col1;
        col1.__set_column_name("col1");
        col1.__set_col_unique_id(10000);

        TColumnType column_type;
        column_type.__set_type(TPrimitiveType::STRING);
        col1.__set_column_type(column_type);

        t_columns.emplace_back(std::move(col1));

        Status ret1 = tablet->check_rowset_schema_for_build_index(t_columns, 1000);
        ASSERT_TRUE(!ret1.ok());
        ASSERT_TRUE(contains_str(ret1.to_string(), "col type not match"));
    }

    {
        std::vector<TColumn> t_columns;
        TColumn col1;
        col1.__set_column_name("col1");
        col1.__set_col_unique_id(10000);

        TColumnType column_type;
        column_type.__set_type(TPrimitiveType::INT);
        col1.__set_column_type(column_type);

        t_columns.emplace_back(std::move(col1));

        Status ret1 = tablet->check_rowset_schema_for_build_index(t_columns, 1000);
        ASSERT_TRUE(ret1.ok());
    }
}

} // namespace doris